/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.transforms;



import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.coders.VoidCoder;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.PaneInfo;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.sdk.values.TimestampedValue;

import com.bff.gaia.unified.sdk.values.ValueInSingleWindow;

import org.joda.time.Duration;

import org.joda.time.Instant;



/**

 * {@link PTransform PTransforms} for converting between explicit and implicit form of various Unified

 * values.

 */

public class Reify {

  private static class ReifyView<K, V> extends PTransform<PCollection<K>, PCollection<KV<K, V>>> {

    private final PCollectionView<V> view;

    private final Coder<V> coder;



    private ReifyView(PCollectionView<V> view, Coder<V> coder) {

      this.view = view;

      this.coder = coder;

    }



    @Override

    public PCollection<KV<K, V>> expand(PCollection<K> input) {

      return input

          .apply(

              ParDo.of(

                      new DoFn<K, KV<K, V>>() {

                        @ProcessElement

                        public void process(ProcessContext c) {

                          c.output(KV.of(c.element(), c.sideInput(view)));

                        }

                      })

                  .withSideInputs(view))

          .setCoder(KvCoder.of(input.getCoder(), coder));

    }

  }



  private static class ReifyViewInGlobalWindow<V> extends PTransform<PBegin, PCollection<V>> {

    private final PCollectionView<V> view;

    private final Coder<V> coder;



    private ReifyViewInGlobalWindow(PCollectionView<V> view, Coder<V> coder) {

      this.view = view;

      this.coder = coder;

    }



    @Override

    public PCollection<V> expand(PBegin input) {

      return input

          .apply(Create.of((Void) null).withCoder(VoidCoder.of()))

          .apply(Reify.viewAsValues(view, coder))

          .apply(Values.create());

    }

  }



  /** Private implementation of {@link #windows()}. */

  private static class Window<T>

      extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {

    @Override

    public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {

      return input

          .apply(

              ParDo.of(

                  new DoFn<T, ValueInSingleWindow<T>>() {

                    @ProcessElement

                    public void processElement(

                        @Element T element,

                        @Timestamp Instant timestamp,

                        BoundedWindow window,

                        PaneInfo pane,

                        OutputReceiver<ValueInSingleWindow<T>> r) {

                      r.outputWithTimestamp(

                          ValueInSingleWindow.of(element, timestamp, window, pane), timestamp);

                    }

                  }))

          .setCoder(

              ValueInSingleWindow.Coder.of(

                  input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));

    }

  }



  private static class Timestamp<T>

      extends PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> {

    @Override

    public PCollection<TimestampedValue<T>> expand(PCollection<T> input) {

      return input

          .apply(

              ParDo.of(

                  new DoFn<T, TimestampedValue<T>>() {

                    @ProcessElement

                    public void processElement(

                        @Element T element,

                        @Timestamp Instant timestamp,

                        OutputReceiver<TimestampedValue<T>> r) {

                      r.output(TimestampedValue.of(element, timestamp));

                    }

                  }))

          .setCoder(TimestampedValue.TimestampedValueCoder.of(input.getCoder()));

    }

  }



  private static class WindowInValue<K, V>

      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>> {

    @Override

    public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> input) {

      KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();

      return input

          .apply(

              ParDo.of(

                  new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {

                    @ProcessElement

                    public void processElement(

                        @Element KV<K, V> element,

                        @Timestamp Instant timestamp,

                        BoundedWindow window,

                        PaneInfo pane,

                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {

                      r.output(

                          KV.of(

                              element.getKey(),

                              ValueInSingleWindow.of(element.getValue(), timestamp, window, pane)));

                    }

                  }))

          .setCoder(

              KvCoder.of(

                  coder.getKeyCoder(),

                  ValueInSingleWindow.Coder.of(

                      coder.getValueCoder(),

                      input.getWindowingStrategy().getWindowFn().windowCoder())));

    }

  }



  private static class TimestampInValue<K, V>

      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> {

    @Override

    public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<KV<K, V>> input) {

      KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();

      return input

          .apply(

              ParDo.of(

                  new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {

                    @ProcessElement

                    public void processElement(

                        @Element KV<K, V> element,

                        @Timestamp Instant timestamp,

                        OutputReceiver<KV<K, TimestampedValue<V>>> r) {

                      r.output(

                          KV.of(

                              element.getKey(),

                              TimestampedValue.of(element.getValue(), timestamp)));

                    }

                  }))

          .setCoder(

              KvCoder.of(coder.getKeyCoder(), TimestampedValue.TimestampedValueCoder.of(coder.getValueCoder())));

    }

  }



  private static class ExtractTimestampsFromValues<K, V>

      extends PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> {

    @Override

    public PCollection<KV<K, V>> expand(PCollection<KV<K, TimestampedValue<V>>> input) {

      KvCoder<K, TimestampedValue<V>> kvCoder = (KvCoder<K, TimestampedValue<V>>) input.getCoder();

      TimestampedValue.TimestampedValueCoder<V> tvCoder = (TimestampedValue.TimestampedValueCoder<V>) kvCoder.getValueCoder();

      return input

          .apply(

              ParDo.of(

                  new DoFn<KV<K, TimestampedValue<V>>, KV<K, V>>() {

                    @Override

                    public Duration getAllowedTimestampSkew() {

                      return Duration.millis(Long.MAX_VALUE);

                    }



                    @ProcessElement

                    public void processElement(

						@Element KV<K, TimestampedValue<V>> kv, OutputReceiver<KV<K, V>> r) {

                      r.outputWithTimestamp(

                          KV.of(kv.getKey(), kv.getValue().getValue()),

                          kv.getValue().getTimestamp());

                    }

                  }))

          .setCoder(KvCoder.of(kvCoder.getKeyCoder(), tvCoder.getValueCoder()));

    }

  }



  private Reify() {}



  /**

   * Create a {@link PTransform} that will output all inputs wrapped in a {@link TimestampedValue}.

   */

  public static <T> PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> timestamps() {

    return new Timestamp<>();

  }



  /**

   * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside

   * the value.

   */

  public static <K, V>

  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>

          timestampsInValue() {

    return new TimestampInValue<>();

  }



  /**

   * Create a {@link PTransform} that will reify information from the processing context into

   * instances of {@link ValueInSingleWindow}.

   *

   * @param <T> element type

   */

  public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> windows() {

    return new Window<>();

  }



  /**

   * Create a {@link PTransform} that will output all input {@link KV KVs} with the window pane info

   * inside the value.

   */

  public static <K, V>

  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>>

          windowsInValue() {

    return new WindowInValue<>();

  }



  /** Extracts the timestamps from each value in a {@link KV}. */

  public static <K, V>

  PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>

          extractTimestampsFromValues() {

    return new ExtractTimestampsFromValues<>();

  }



  /**

   * Pairs each element in a collection with the value of a side input associated with the element's

   * window.

   */

  public static <K, V> PTransform<PCollection<K>, PCollection<KV<K, V>>> viewAsValues(

	  PCollectionView<V> view, Coder<V> coder) {

    return new ReifyView<>(view, coder);

  }



  /**

   * Returns a {@link PCollection} consisting of a single element, containing the value of the given

   * view in the global window.

   */

  public static <K, V> PTransform<PBegin, PCollection<V>> viewInGlobalWindow(

	  PCollectionView<V> view, Coder<V> coder) {

    return new ReifyViewInGlobalWindow<>(view, coder);

  }

}